From 8f5f88adee39cb71bba062505f8fb3a69b02be88 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 18 Mar 2022 11:57:53 +0000 Subject: [PATCH] Support intra-process communication: Clients & Services (ros2#1847) Signed-off-by: Mauro Passerino --- rclcpp/CMakeLists.txt | 2 + rclcpp/include/rclcpp/client.hpp | 122 +++++++++- rclcpp/include/rclcpp/create_client.hpp | 6 +- rclcpp/include/rclcpp/create_service.hpp | 7 +- .../detail/resolve_use_intra_process.hpp | 6 +- .../buffers/intra_process_buffer.hpp | 45 ++++ .../experimental/client_intra_process.hpp | 142 ++++++++++++ .../client_intra_process_base.hpp | 196 ++++++++++++++++ .../create_intra_process_buffer.hpp | 20 ++ .../experimental/intra_process_manager.hpp | 122 +++++++++- .../experimental/service_intra_process.hpp | 160 +++++++++++++ .../service_intra_process_base.hpp | 211 ++++++++++++++++++ .../include/rclcpp/intra_process_setting.hpp | 6 +- rclcpp/include/rclcpp/node.hpp | 8 +- rclcpp/include/rclcpp/node_impl.hpp | 12 +- rclcpp/include/rclcpp/publisher.hpp | 2 +- rclcpp/include/rclcpp/service.hpp | 119 +++++++++- rclcpp/include/rclcpp/subscription.hpp | 2 +- rclcpp/include/rclcpp/subscription_base.hpp | 2 +- rclcpp/src/rclcpp/client.cpp | 51 +++++ .../src/rclcpp/client_intra_process_base.cpp | 36 +++ rclcpp/src/rclcpp/intra_process_manager.cpp | 172 +++++++++++++- .../rclcpp/node_interfaces/node_services.cpp | 12 + rclcpp/src/rclcpp/service.cpp | 57 ++++- .../src/rclcpp/service_intra_process_base.cpp | 45 ++++ .../node_interfaces/test_node_services.cpp | 5 +- rclcpp/test/rclcpp/test_client.cpp | 6 +- .../test_externally_defined_services.cpp | 6 +- .../rclcpp/test_intra_process_manager.cpp | 107 +++++++++ rclcpp/test/rclcpp/test_service.cpp | 17 +- .../src/lifecycle_node_interface_impl.cpp | 37 ++- .../src/lifecycle_node_interface_impl.hpp | 1 + 32 files changed, 1676 insertions(+), 66 deletions(-) create mode 100644 rclcpp/include/rclcpp/experimental/client_intra_process.hpp create mode 100644 rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp create mode 100644 rclcpp/include/rclcpp/experimental/service_intra_process.hpp create mode 100644 rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp create mode 100644 rclcpp/src/rclcpp/client_intra_process_base.cpp create mode 100644 rclcpp/src/rclcpp/service_intra_process_base.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index b2d1023064..51c1c4ecc1 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -39,6 +39,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/any_executable.cpp src/rclcpp/callback_group.cpp src/rclcpp/client.cpp + src/rclcpp/client_intra_process_base.cpp src/rclcpp/clock.cpp src/rclcpp/context.cpp src/rclcpp/contexts/default_context.cpp @@ -108,6 +109,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/serialization.cpp src/rclcpp/serialized_message.cpp src/rclcpp/service.cpp + src/rclcpp/service_intra_process_base.cpp src/rclcpp/signal_handler.cpp src/rclcpp/subscription_base.cpp src/rclcpp/subscription_intra_process_base.cpp diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index 92f2f4af8d..503d6ef6f8 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -37,9 +37,13 @@ #include "rclcpp/clock.hpp" #include "rclcpp/detail/cpp_callback_trampoline.hpp" +#include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" +#include "rclcpp/experimental/client_intra_process.hpp" +#include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/function_traits.hpp" +#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_graph_interface.hpp" @@ -48,6 +52,8 @@ #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcutils/logging_macros.h" + #include "rmw/error_handling.h" #include "rmw/impl/cpp/demangle.hpp" #include "rmw/rmw.h" @@ -133,7 +139,7 @@ class ClientBase rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph); RCLCPP_PUBLIC - virtual ~ClientBase() = default; + virtual ~ClientBase(); /// Take the next response for this client as a type erased pointer. /** @@ -254,6 +260,15 @@ class ClientBase rclcpp::QoS get_response_subscription_actual_qos() const; + /// Return the waitable for intra-process + /** + * \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup. + * \throws std::runtime_error if the intra process manager is destroyed + */ + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_intra_process_waitable(); + /// Set a callback to be called when each new response is received. /** * The callback receives a size_t which is the number of responses received @@ -358,6 +373,19 @@ class ClientBase void set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data); + using IntraProcessManagerWeakPtr = + std::weak_ptr; + + /// Implementation detail. + RCLCPP_PUBLIC + void + setup_intra_process( + uint64_t intra_process_client_id, + IntraProcessManagerWeakPtr weak_ipm); + + std::shared_ptr client_intra_process_; + std::atomic_uint ipc_sequence_number_{1}; + rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_; std::shared_ptr node_handle_; std::shared_ptr context_; @@ -373,6 +401,11 @@ class ClientBase std::shared_ptr client_handle_; std::atomic in_use_by_wait_set_{false}; + + std::recursive_mutex ipc_mutex_; + bool use_intra_process_{false}; + IntraProcessManagerWeakPtr weak_ipm_; + uint64_t intra_process_client_id_; }; template @@ -468,12 +501,14 @@ class Client : public ClientBase * \param[in] node_graph The node graph interface of the corresponding node. * \param[in] service_name Name of the topic to publish to. * \param[in] client_options options for the subscription. + * \param[in] ipc_setting Intra-process communication setting for the client. */ Client( rclcpp::node_interfaces::NodeBaseInterface * node_base, rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, const std::string & service_name, - rcl_client_options_t & client_options) + rcl_client_options_t & client_options, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) : ClientBase(node_base, node_graph), srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle()) { @@ -496,10 +531,27 @@ class Client : public ClientBase } rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client"); } + + // Setup intra process if requested. + if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) { + create_intra_process_client(); + } } virtual ~Client() { + if (!use_intra_process_) { + return; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + // TODO(ivanpauno): should this raise an error? + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a client."); + return; + } + ipm->remove_client(intra_process_client_id_); } /// Take the next response for this client. @@ -616,7 +668,7 @@ class Client : public ClientBase Promise promise; auto future = promise.get_future(); auto req_id = async_send_request_impl( - *request, + std::move(request), std::move(promise)); return FutureAndRequestId(std::move(future), req_id); } @@ -651,7 +703,7 @@ class Client : public ClientBase Promise promise; auto shared_future = promise.get_future().share(); auto req_id = async_send_request_impl( - *request, + std::move(request), std::make_tuple( CallbackType{std::forward(cb)}, shared_future, @@ -682,7 +734,7 @@ class Client : public ClientBase PromiseWithRequest promise; auto shared_future = promise.get_future().share(); auto req_id = async_send_request_impl( - *request, + request, std::make_tuple( CallbackWithRequestType{std::forward(cb)}, request, @@ -824,11 +876,33 @@ class Client : public ClientBase CallbackWithRequestTypeValueVariant>; int64_t - async_send_request_impl(const Request & request, CallbackInfoVariant value) + async_send_request_impl(SharedRequest request, CallbackInfoVariant value) { + std::lock_guard lock(ipc_mutex_); + if (use_intra_process_) { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process send called after destruction of intra process manager"); + } + bool intra_process_server_available = ipm->service_is_available(intra_process_client_id_); + + // Check if there's an intra-process server available matching this client. + // If there's not, we fall back into inter-process communication, since + // the server might be available in another process or was configured to not use IPC. + if (intra_process_server_available) { + // Send intra-process request + ipm->send_intra_process_client_request( + intra_process_client_id_, + std::make_pair(std::move(request), std::move(value))); + return ipc_sequence_number_++; + } + } + + // Send inter-process request int64_t sequence_number; std::lock_guard lock(pending_requests_mutex_); - rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number); + rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request"); } @@ -854,6 +928,40 @@ class Client : public ClientBase return value; } + void + create_intra_process_client() + { + // Check if the QoS is compatible with intra-process. + auto qos_profile = get_response_subscription_actual_qos(); + + if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { + throw std::invalid_argument( + "intraprocess communication allowed only with keep last history qos policy"); + } + if (qos_profile.depth() == 0) { + throw std::invalid_argument( + "intraprocess communication is not allowed with 0 depth qos policy"); + } + if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) { + throw std::invalid_argument( + "intraprocess communication allowed only with volatile durability"); + } + + // Create a ClientIntraProcess which will be given to the intra-process manager. + using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess; + + client_intra_process_ = std::make_shared( + context_, + this->get_service_name(), + qos_profile); + + // Add it to the intra process manager. + using rclcpp::experimental::IntraProcessManager; + auto ipm = context_->get_sub_context(); + uint64_t intra_process_client_id = ipm->add_intra_process_client(client_intra_process_); + this->setup_intra_process(intra_process_client_id, ipm); + } + RCLCPP_DISABLE_COPY(Client) std::unordered_map< diff --git a/rclcpp/include/rclcpp/create_client.hpp b/rclcpp/include/rclcpp/create_client.hpp index 00e6ff3c0e..78dc681877 100644 --- a/rclcpp/include/rclcpp/create_client.hpp +++ b/rclcpp/include/rclcpp/create_client.hpp @@ -65,7 +65,8 @@ create_client( std::shared_ptr node_services, const std::string & service_name, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group) + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) { rcl_client_options_t options = rcl_client_get_default_options(); options.qos = qos_profile; @@ -74,7 +75,8 @@ create_client( node_base.get(), node_graph, service_name, - options); + options, + ipc_setting); auto cli_base_ptr = std::dynamic_pointer_cast(cli); node_services->add_client(cli_base_ptr, group); diff --git a/rclcpp/include/rclcpp/create_service.hpp b/rclcpp/include/rclcpp/create_service.hpp index 42c253a526..031daf42a0 100644 --- a/rclcpp/include/rclcpp/create_service.hpp +++ b/rclcpp/include/rclcpp/create_service.hpp @@ -63,7 +63,8 @@ create_service( const std::string & service_name, CallbackT && callback, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group) + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) { rclcpp::AnyServiceCallback any_service_callback; any_service_callback.set(std::forward(callback)); @@ -72,8 +73,8 @@ create_service( service_options.qos = qos_profile; auto serv = Service::make_shared( - node_base->get_shared_rcl_node_handle(), - service_name, any_service_callback, service_options); + node_base, + service_name, any_service_callback, service_options, ipc_setting); auto serv_base_ptr = std::dynamic_pointer_cast(serv); node_services->add_service(serv_base_ptr, group); return serv; diff --git a/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp b/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp index 9098bfe695..765b0132b1 100644 --- a/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp +++ b/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp @@ -26,12 +26,12 @@ namespace detail { /// Return whether or not intra process is enabled, resolving "NodeDefault" if needed. -template +template bool -resolve_use_intra_process(const OptionsT & options, const NodeBaseT & node_base) +resolve_use_intra_process(IntraProcessSetting ipc_setting, const NodeBaseT & node_base) { bool use_intra_process; - switch (options.use_intra_process_comm) { + switch (ipc_setting) { case IntraProcessSetting::Enable: use_intra_process = true; break; diff --git a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp index 05092bb23b..b6205743c6 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp @@ -239,6 +239,51 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer +class ServiceIntraProcessBuffer : public IntraProcessBufferBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBuffer) + + virtual ~ServiceIntraProcessBuffer() {} + + explicit + ServiceIntraProcessBuffer( + std::unique_ptr> buffer_impl) + { + buffer_ = std::move(buffer_impl); + } + + bool use_take_shared_method() const override + { + return false; + } + + bool has_data() const override + { + return buffer_->has_data(); + } + + void clear() override + { + buffer_->clear(); + } + + void add(BufferT && msg) + { + buffer_->enqueue(std::move(msg)); + } + + BufferT + consume() + { + return buffer_->dequeue(); + } + +private: + std::unique_ptr> buffer_; +}; + } // namespace buffers } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/client_intra_process.hpp b/rclcpp/include/rclcpp/experimental/client_intra_process.hpp new file mode 100644 index 0000000000..fb30e6ee81 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/client_intra_process.hpp @@ -0,0 +1,142 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_HPP_ +#define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_HPP_ + +#include +#include +#include +#include +#include +#include +#include // NOLINT, cpplint doesn't think this is a cpp std header + +#include "rcutils/logging_macros.h" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" +#include "rclcpp/experimental/create_intra_process_buffer.hpp" +#include "rclcpp/experimental/client_intra_process_base.hpp" + +namespace rclcpp +{ +namespace experimental +{ + +template +class ClientIntraProcess : public ClientIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(ClientIntraProcess) + + using SharedRequest = typename ServiceT::Request::SharedPtr; + using SharedResponse = typename ServiceT::Response::SharedPtr; + + using Promise = std::promise; + using PromiseWithRequest = std::promise>; + + using SharedFuture = std::shared_future; + using SharedFutureWithRequest = std::shared_future>; + + using CallbackType = std::function; + using CallbackWithRequestType = std::function; + + using CallbackTypeValueVariant = std::tuple; + using CallbackWithRequestTypeValueVariant = std::tuple< + CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>; + + using CallbackInfoVariant = std::variant< + std::promise, + CallbackTypeValueVariant, + CallbackWithRequestTypeValueVariant>; + + using ServiceResponse = std::pair; + + ClientIntraProcess( + rclcpp::Context::SharedPtr context, + const std::string & service_name, + const rclcpp::QoS & qos_profile) + : ClientIntraProcessBase(context, service_name, qos_profile) + { + // Create the intra-process buffer. + buffer_ = rclcpp::experimental::create_service_intra_process_buffer< + ServiceResponse>(qos_profile); + } + + virtual ~ClientIntraProcess() = default; + + bool + is_ready(rcl_wait_set_t * wait_set) + { + (void) wait_set; + return buffer_->has_data(); + } + + void + store_intra_process_response(ServiceResponse && response) + { + buffer_->add(std::move(response)); + gc_.trigger(); + invoke_on_new_response(); + } + + std::shared_ptr + take_data() override + { + auto data = std::make_shared(std::move(buffer_->consume())); + return std::static_pointer_cast(data); + } + + void execute(std::shared_ptr & data) + { + if (!data) { + throw std::runtime_error("'data' is empty"); + } + + auto data_ptr = std::static_pointer_cast(data); + auto & typed_response = data_ptr->first; + auto & value = data_ptr->second; + + if (std::holds_alternative(value)) { + auto & promise = std::get(value); + promise.set_value(std::move(typed_response)); + } else if (std::holds_alternative(value)) { + auto & inner = std::get(value); + const auto & callback = std::get(inner); + auto & promise = std::get(inner); + auto & future = std::get(inner); + promise.set_value(std::move(typed_response)); + callback(std::move(future)); + } else if (std::holds_alternative(value)) { + auto & inner = std::get(value); + const auto & callback = std::get(inner); + auto & promise = std::get(inner); + auto & future = std::get(inner); + auto & request = std::get(inner); + promise.set_value(std::make_pair(std::move(request), std::move(typed_response))); + callback(std::move(future)); + } + } + +protected: + using BufferUniquePtr = + typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer< + ServiceResponse>::UniquePtr; + + BufferUniquePtr buffer_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp new file mode 100644 index 0000000000..5c59d8e421 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp @@ -0,0 +1,196 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ + +#include +#include +#include +#include + +#include "rmw/impl/cpp/demangle.hpp" + +#include "rclcpp/context.hpp" +#include "rclcpp/guard_condition.hpp" +#include "rclcpp/logging.hpp" +#include "rclcpp/qos.hpp" +#include "rclcpp/type_support_decl.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace experimental +{ + +class ClientIntraProcessBase : public rclcpp::Waitable +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ClientIntraProcessBase) + + enum class EntityType : std::size_t + { + Client, + }; + + RCLCPP_PUBLIC + ClientIntraProcessBase( + rclcpp::Context::SharedPtr context, + const std::string & service_name, + const rclcpp::QoS & qos_profile) + : gc_(context), service_name_(service_name), qos_profile_(qos_profile) + {} + + virtual ~ClientIntraProcessBase() = default; + + RCLCPP_PUBLIC + size_t + get_number_of_ready_guard_conditions() {return 1;} + + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set); + + virtual bool + is_ready(rcl_wait_set_t * wait_set) = 0; + + virtual + std::shared_ptr + take_data() = 0; + + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void)id; + return take_data(); + } + + virtual void + execute(std::shared_ptr & data) = 0; + + RCLCPP_PUBLIC + const char * + get_service_name() const; + + RCLCPP_PUBLIC + QoS + get_actual_qos() const; + + /// Set a callback to be called when each new response arrives. + /** + * The callback receives a size_t which is the number of responses received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if responses were received before any + * callback was set. + * + * The callback also receives an int identifier argument. + * This is needed because a Waitable may be composed of several distinct entities, + * such as subscriptions, services, etc. + * The application should provide a generic callback function that will be then + * forwarded by the waitable to all of its entities. + * Before forwarding, a different value for the identifier argument will be + * bound to the function. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called when a new response is received. + */ + void + set_on_ready_callback(std::function callback) override + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(EntityType::Client)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + // TODO(wjwwood): get this class access to the node logger it is associated with + rclcpp::get_logger("rclcpp"), + "rclcpp::ClientIntraProcessBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::ClientIntraProcessBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + std::lock_guard lock(reentrant_mutex_); + on_new_response_callback_ = new_callback; + + if (unread_count_ > 0) { + if (qos_profile_.history() == HistoryPolicy::KeepAll) { + on_new_response_callback_(unread_count_); + } else { + // Use qos profile depth as upper bound for unread_count_ + on_new_response_callback_(std::min(unread_count_, qos_profile_.depth())); + } + unread_count_ = 0; + } + } + + /// Unset the callback registered for new messages, if any. + void + clear_on_ready_callback() override + { + std::lock_guard lock(reentrant_mutex_); + on_new_response_callback_ = nullptr; + } + +protected: + std::recursive_mutex reentrant_mutex_; + rclcpp::GuardCondition gc_; + std::function on_new_response_callback_ {nullptr}; + size_t unread_count_{0}; + + void + invoke_on_new_response() + { + std::lock_guard lock(reentrant_mutex_); + if (on_new_response_callback_) { + on_new_response_callback_(1); + } else { + unread_count_++; + } + } + +private: + std::string service_name_; + QoS qos_profile_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp index 4d7668b964..262aec96a3 100644 --- a/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp @@ -92,6 +92,26 @@ create_intra_process_buffer( return buffer; } +template +typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer::UniquePtr +create_service_intra_process_buffer(const rclcpp::QoS & qos) +{ + using rclcpp::experimental::buffers::RingBufferImplementation; + + size_t buffer_size = qos.depth(); + auto buffer_impl = std::make_unique>(buffer_size); + + using rclcpp::experimental::buffers::ServiceIntraProcessBuffer; + typename ServiceIntraProcessBuffer::UniquePtr buffer; + + // Construct the intra_process_buffer + buffer = + std::make_unique>( + std::move(buffer_impl)); + + return buffer; +} + } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 11f2dda6a4..458bbbbf1e 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -28,7 +28,11 @@ #include #include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/experimental/client_intra_process.hpp" +#include "rclcpp/experimental/client_intra_process_base.hpp" #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp" +#include "rclcpp/experimental/service_intra_process.hpp" +#include "rclcpp/experimental/service_intra_process_base.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" @@ -97,10 +101,10 @@ class IntraProcessManager RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager) RCLCPP_PUBLIC - IntraProcessManager(); + IntraProcessManager() = default; RCLCPP_PUBLIC - virtual ~IntraProcessManager(); + virtual ~IntraProcessManager() = default; /// Register a subscription with the manager, returns subscriptions unique id. /** @@ -116,6 +120,24 @@ class IntraProcessManager uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription); + /// Register an intra-process client with the manager, returns the client unique id. + /** + * \param client the ClientIntraProcessBase to register. + * \return an unsigned 64-bit integer which is the client's unique id. + */ + RCLCPP_PUBLIC + uint64_t + add_intra_process_client(rclcpp::experimental::ClientIntraProcessBase::SharedPtr client); + + /// Register an intra-process service with the manager, returns the service unique id. + /** + * \param service the Service IntraProcessBase to register. + * \return an unsigned 64-bit integer which is the service's unique id. + */ + RCLCPP_PUBLIC + uint64_t + add_intra_process_service(rclcpp::experimental::ServiceIntraProcessBase::SharedPtr service); + /// Unregister a subscription using the subscription's unique id. /** * This method does not allocate memory. @@ -126,6 +148,22 @@ class IntraProcessManager void remove_subscription(uint64_t intra_process_subscription_id); + /// Unregister a client using the client's unique id. + /** + * \param intra_process_client_id id of the client to remove. + */ + RCLCPP_PUBLIC + void + remove_client(uint64_t intra_process_client_id); + + /// Unregister a service using the service's unique id. + /** + * \param intra_process_service_id id of the service to remove. + */ + RCLCPP_PUBLIC + void + remove_service(uint64_t intra_process_service_id); + /// Register a publisher with the manager, returns the publisher unique id. /** * This method stores the publisher intra process object, together with @@ -237,6 +275,56 @@ class IntraProcessManager } } + /// Send an intra-process client request + /** + * Using the intra-process client id, a matching intra-process service is retrieved + * which will store the request to process it asynchronously. + * + * \param intra_process_client_id the id of the client sending the request + * \param request the client's request plus callbacks if any. + */ + template + void + send_intra_process_client_request( + uint64_t intra_process_client_id, + RequestT request) + { + std::unique_lock lock(mutex_); + + auto client_it = clients_to_services_.find(intra_process_client_id); + + if (client_it == clients_to_services_.end()) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling send_intra_process_client_request for invalid or no " + "longer existing client id"); + return; + } + uint64_t service_id = client_it->second; + + auto service_it = services_.find(service_id); + if (service_it == services_.end()) { + auto cli = get_client_intra_process(intra_process_client_id); + auto warning_msg = + "Intra-process service gone out of scope. " + "Do inter-process requests.\n" + "Client topic name: " + std::string(cli->get_service_name()); + RCLCPP_WARN(rclcpp::get_logger("rclcpp"), warning_msg.c_str()); + return; + } + auto service_intra_process_base = service_it->second.lock(); + if (service_intra_process_base) { + auto service = std::dynamic_pointer_cast< + rclcpp::experimental::ServiceIntraProcess>(service_intra_process_base); + if (service) { + service->store_intra_process_request( + intra_process_client_id, std::move(request)); + } + } else { + services_.erase(service_it); + } + } + template< typename MessageT, typename ROSMessageType, @@ -306,6 +394,18 @@ class IntraProcessManager rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr get_subscription_intra_process(uint64_t intra_process_subscription_id); + RCLCPP_PUBLIC + rclcpp::experimental::ClientIntraProcessBase::SharedPtr + get_client_intra_process(uint64_t intra_process_client_id); + + RCLCPP_PUBLIC + rclcpp::experimental::ServiceIntraProcessBase::SharedPtr + get_service_intra_process(uint64_t intra_process_service_id); + + RCLCPP_PUBLIC + bool + service_is_available(uint64_t intra_process_client_id); + private: struct SplittedSubscriptions { @@ -322,6 +422,15 @@ class IntraProcessManager using PublisherToSubscriptionIdsMap = std::unordered_map; + using ClientMap = + std::unordered_map; + + using ServiceMap = + std::unordered_map; + + using ClientToServiceIdsMap = + std::unordered_map; + RCLCPP_PUBLIC static uint64_t @@ -337,6 +446,12 @@ class IntraProcessManager rclcpp::PublisherBase::SharedPtr pub, rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const; + RCLCPP_PUBLIC + bool + can_communicate( + rclcpp::experimental::ClientIntraProcessBase::SharedPtr client, + rclcpp::experimental::ServiceIntraProcessBase::SharedPtr service) const; + template< typename MessageT, typename Alloc, @@ -515,6 +630,9 @@ class IntraProcessManager PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_; + ClientToServiceIdsMap clients_to_services_; + ClientMap clients_; + ServiceMap services_; mutable std::shared_timed_mutex mutex_; }; diff --git a/rclcpp/include/rclcpp/experimental/service_intra_process.hpp b/rclcpp/include/rclcpp/experimental/service_intra_process.hpp new file mode 100644 index 0000000000..9a4852a6a7 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/service_intra_process.hpp @@ -0,0 +1,160 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_HPP_ +#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "rcl/error_handling.h" +#include "rcutils/logging_macros.h" + +#include "rclcpp/any_service_callback.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" +#include "rclcpp/experimental/client_intra_process.hpp" +#include "rclcpp/experimental/create_intra_process_buffer.hpp" +#include "rclcpp/experimental/service_intra_process_base.hpp" +#include "rclcpp/logging.hpp" +#include "rclcpp/qos.hpp" +#include "rclcpp/type_support_decl.hpp" + +namespace rclcpp +{ +namespace experimental +{ + +template +class ServiceIntraProcess : public ServiceIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(ServiceIntraProcess) + + using SharedRequest = typename ServiceT::Request::SharedPtr; + using SharedResponse = typename ServiceT::Response::SharedPtr; + + using Promise = std::promise; + using PromiseWithRequest = std::promise>; + + using SharedFuture = std::shared_future; + using SharedFutureWithRequest = std::shared_future>; + + using CallbackType = std::function; + using CallbackWithRequestType = std::function; + + using CallbackTypeValueVariant = std::tuple; + using CallbackWithRequestTypeValueVariant = std::tuple< + CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>; + + using CallbackInfoVariant = std::variant< + std::promise, + CallbackTypeValueVariant, + CallbackWithRequestTypeValueVariant>; + + using RequestCallbackPair = std::pair; + using ClientIDtoRequest = std::pair; + + ServiceIntraProcess( + AnyServiceCallback callback, + rclcpp::Context::SharedPtr context, + const std::string & service_name, + const rclcpp::QoS & qos_profile) + : ServiceIntraProcessBase(context, service_name, qos_profile), any_callback_(callback) + { + // Create the intra-process buffer. + buffer_ = rclcpp::experimental::create_service_intra_process_buffer< + ClientIDtoRequest>(qos_profile); + } + + virtual ~ServiceIntraProcess() = default; + + bool + is_ready(rcl_wait_set_t * wait_set) + { + (void) wait_set; + return buffer_->has_data(); + } + + void + store_intra_process_request( + uint64_t intra_process_client_id, + RequestCallbackPair request) + { + buffer_->add(std::make_pair(intra_process_client_id, std::move(request))); + gc_.trigger(); + invoke_on_new_request(); + } + + std::shared_ptr + take_data() + { + auto data = std::make_shared(std::move(buffer_->consume())); + return std::static_pointer_cast(data); + } + + void execute(std::shared_ptr & data) + { + auto ptr = std::static_pointer_cast(data); + + uint64_t intra_process_client_id = ptr->first; + SharedRequest & typed_request = ptr->second.first; + CallbackInfoVariant & value = ptr->second.second; + + SharedResponse response = any_callback_.dispatch(nullptr, nullptr, std::move(typed_request)); + + if (response) { + std::unique_lock lock(reentrant_mutex_); + + auto client_it = clients_.find(intra_process_client_id); + + if (client_it == clients_.end()) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling intra_process_service_send_response for invalid or no " + "longer existing client id"); + return; + } + + auto client_intra_process_base = client_it->second.lock(); + + if (client_intra_process_base) { + auto client = std::dynamic_pointer_cast< + rclcpp::experimental::ClientIntraProcess>( + client_intra_process_base); + client->store_intra_process_response( + std::make_pair(std::move(response), std::move(value))); + } else { + clients_.erase(client_it); + } + } + } + +protected: + using BufferUniquePtr = + typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer< + ClientIDtoRequest>::UniquePtr; + + BufferUniquePtr buffer_; + + AnyServiceCallback any_callback_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp new file mode 100644 index 0000000000..a75a83226f --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp @@ -0,0 +1,211 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "rcl/error_handling.h" +#include "rmw/impl/cpp/demangle.hpp" + +#include "rclcpp/context.hpp" +#include "rclcpp/experimental/client_intra_process_base.hpp" +#include "rclcpp/guard_condition.hpp" +#include "rclcpp/logging.hpp" +#include "rclcpp/qos.hpp" +#include "rclcpp/type_support_decl.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace experimental +{ + +class ServiceIntraProcessBase : public rclcpp::Waitable +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBase) + + enum class EntityType : std::size_t + { + Service, + }; + + RCLCPP_PUBLIC + ServiceIntraProcessBase( + rclcpp::Context::SharedPtr context, + const std::string & service_name, + const rclcpp::QoS & qos_profile) + : gc_(context), service_name_(service_name), qos_profile_(qos_profile) + {} + + virtual ~ServiceIntraProcessBase() = default; + + RCLCPP_PUBLIC + size_t + get_number_of_ready_guard_conditions() {return 1;} + + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set); + + virtual bool + is_ready(rcl_wait_set_t * wait_set) = 0; + + virtual + std::shared_ptr + take_data() = 0; + + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void)id; + return take_data(); + } + + virtual void + execute(std::shared_ptr & data) = 0; + + RCLCPP_PUBLIC + const char * + get_service_name() const; + + RCLCPP_PUBLIC + QoS + get_actual_qos() const; + + RCLCPP_PUBLIC + void + add_intra_process_client( + rclcpp::experimental::ClientIntraProcessBase::SharedPtr client, + uint64_t client_id); + + /// Set a callback to be called when each new request arrives. + /** + * The callback receives a size_t which is the number of requests received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if requests were received before any + * callback was set. + * + * The callback also receives an int identifier argument. + * This is needed because a Waitable may be composed of several distinct entities, + * such as subscriptions, services, etc. + * The application should provide a generic callback function that will be then + * forwarded by the waitable to all of its entities. + * Before forwarding, a different value for the identifier argument will be + * bound to the function. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called when a new request is received. + */ + void + set_on_ready_callback(std::function callback) override + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(EntityType::Service)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + // TODO(wjwwood): get this class access to the node logger it is associated with + rclcpp::get_logger("rclcpp"), + "rclcpp::ServiceIntraProcessBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::ServiceIntraProcessBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + std::lock_guard lock(reentrant_mutex_); + on_new_request_callback_ = new_callback; + + if (unread_count_ > 0) { + if (qos_profile_.history() == HistoryPolicy::KeepAll) { + on_new_request_callback_(unread_count_); + } else { + // Use qos profile depth as upper bound for unread_count_ + on_new_request_callback_(std::min(unread_count_, qos_profile_.depth())); + } + unread_count_ = 0; + } + } + + /// Unset the callback registered for new messages, if any. + void + clear_on_ready_callback() override + { + std::lock_guard lock(reentrant_mutex_); + on_new_request_callback_ = nullptr; + } + +protected: + std::recursive_mutex reentrant_mutex_; + rclcpp::GuardCondition gc_; + std::function on_new_request_callback_ {nullptr}; + size_t unread_count_{0}; + + void + invoke_on_new_request() + { + std::lock_guard lock(reentrant_mutex_); + if (on_new_request_callback_) { + on_new_request_callback_(1); + } else { + unread_count_++; + } + } + + using ClientMap = + std::unordered_map; + + ClientMap clients_; + +private: + std::string service_name_; + QoS qos_profile_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ diff --git a/rclcpp/include/rclcpp/intra_process_setting.hpp b/rclcpp/include/rclcpp/intra_process_setting.hpp index 8e4b44eb64..5207622592 100644 --- a/rclcpp/include/rclcpp/intra_process_setting.hpp +++ b/rclcpp/include/rclcpp/intra_process_setting.hpp @@ -18,12 +18,12 @@ namespace rclcpp { -/// Used as argument in create_publisher and create_subscriber. +/// Used as argument when creating publishers, subscriptions, clients and services. enum class IntraProcessSetting { - /// Explicitly enable intraprocess comm at publisher/subscription level. + /// Explicitly enable intraprocess comm. Enable, - /// Explicitly disable intraprocess comm at publisher/subscription level. + /// Explicitly disable intraprocess comm. Disable, /// Take intraprocess configuration from the node. NodeDefault diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 7ecb67e9b1..99ae1e360d 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -258,6 +258,7 @@ class Node : public std::enable_shared_from_this * \param[in] service_name The topic to service on. * \param[in] qos_profile rmw_qos_profile_t Quality of service profile for client. * \param[in] group Callback group to call the service. + * \param[in] ipc_setting Intra-process communication setting for the client. * \return Shared pointer to the created client. * \deprecated use rclcpp::QoS instead of rmw_qos_profile_t */ @@ -267,7 +268,8 @@ class Node : public std::enable_shared_from_this create_client( const std::string & service_name, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group = nullptr); + rclcpp::CallbackGroup::SharedPtr group = nullptr, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault); /// Create and return a Client. /** @@ -289,6 +291,7 @@ class Node : public std::enable_shared_from_this * \param[in] callback User-defined callback function. * \param[in] qos_profile rmw_qos_profile_t Quality of service profile for client. * \param[in] group Callback group to call the service. + * \param[in] ipc_setting Intra-process communication setting for the service. * \return Shared pointer to the created service. * \deprecated use rclcpp::QoS instead of rmw_qos_profile_t */ @@ -299,7 +302,8 @@ class Node : public std::enable_shared_from_this const std::string & service_name, CallbackT && callback, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group = nullptr); + rclcpp::CallbackGroup::SharedPtr group = nullptr, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault); /// Create and return a Service. /** diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index d6b090d4d1..5a32171e07 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -157,7 +157,8 @@ typename Client::SharedPtr Node::create_client( const std::string & service_name, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group) + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::IntraProcessSetting ipc_setting) { return rclcpp::create_client( node_base_, @@ -165,7 +166,8 @@ Node::create_client( node_services_, extend_name_with_sub_namespace(service_name, this->get_sub_namespace()), qos_profile, - group); + group, + ipc_setting); } template @@ -191,7 +193,8 @@ Node::create_service( const std::string & service_name, CallbackT && callback, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group) + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::IntraProcessSetting ipc_setting) { return rclcpp::create_service( node_base_, @@ -199,7 +202,8 @@ Node::create_service( extend_name_with_sub_namespace(service_name, this->get_sub_namespace()), std::forward(callback), qos_profile, - group); + group, + ipc_setting); } template diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index f474a67192..3f23ce401c 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -158,7 +158,7 @@ class Publisher : public PublisherBase (void)options; // If needed, setup intra process communication. - if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { + if (rclcpp::detail::resolve_use_intra_process(options_.use_intra_process_comm, *node_base)) { auto context = node_base->get_context(); // Get the intra process manager instance for this context. auto ipm = context->get_sub_context(); diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index 264b6aff23..0dcd73f754 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -37,8 +37,12 @@ #include "rclcpp/any_service_callback.hpp" #include "rclcpp/clock.hpp" #include "rclcpp/detail/cpp_callback_trampoline.hpp" +#include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" +#include "rclcpp/experimental/intra_process_manager.hpp" +#include "rclcpp/experimental/service_intra_process.hpp" +#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/qos.hpp" @@ -48,16 +52,22 @@ namespace rclcpp { +namespace node_interfaces +{ +class NodeBaseInterface; +} // namespace node_interfaces + class ServiceBase { public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ServiceBase) RCLCPP_PUBLIC - explicit ServiceBase(std::shared_ptr node_handle); + explicit ServiceBase( + std::shared_ptr node_base); RCLCPP_PUBLIC - virtual ~ServiceBase() = default; + virtual ~ServiceBase(); /// Return the name of the service. /** \return The name of the service. */ @@ -162,6 +172,15 @@ class ServiceBase rclcpp::QoS get_request_subscription_actual_qos() const; + /// Return the waitable for intra-process + /** + * \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup. + * \throws std::runtime_error if the intra process manager is destroyed + */ + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_intra_process_waitable(); + /// Set a callback to be called when each new request is received. /** * The callback receives a size_t which is the number of requests received @@ -263,7 +282,20 @@ class ServiceBase void set_on_new_request_callback(rcl_event_callback_t callback, const void * user_data); + using IntraProcessManagerWeakPtr = + std::weak_ptr; + + /// Implementation detail. + RCLCPP_PUBLIC + void + setup_intra_process( + uint64_t intra_process_service_id, + IntraProcessManagerWeakPtr weak_ipm); + + std::shared_ptr service_intra_process_; + std::shared_ptr node_handle_; + std::shared_ptr context_; std::recursive_mutex callback_mutex_; // It is important to declare on_new_request_callback_ before @@ -278,6 +310,11 @@ class ServiceBase rclcpp::Logger node_logger_; std::atomic in_use_by_wait_set_{false}; + + std::recursive_mutex ipc_mutex_; + bool use_intra_process_{false}; + IntraProcessManagerWeakPtr weak_ipm_; + uint64_t intra_process_service_id_; }; template @@ -308,12 +345,14 @@ class Service * \param[in] service_name Name of the topic to publish to. * \param[in] any_callback User defined callback to call when a client request is received. * \param[in] service_options options for the subscription. + * \param[in] ipc_setting Intra-process communication setting for the service. */ Service( - std::shared_ptr node_handle, + std::shared_ptr node_base, const std::string & service_name, AnyServiceCallback any_callback, - rcl_service_options_t & service_options) + rcl_service_options_t & service_options, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) : ServiceBase(node_handle), any_callback_(any_callback), srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle()) { @@ -359,6 +398,11 @@ class Service #ifndef TRACETOOLS_DISABLED any_callback_.register_callback_for_tracing(); #endif + + // Setup intra process if requested. + if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) { + create_intra_process_service(); + } } /// Default constructor. @@ -370,11 +414,13 @@ class Service * \param[in] node_handle NodeBaseInterface pointer that is used in part of the setup. * \param[in] service_handle service handle. * \param[in] any_callback User defined callback to call when a client request is received. + * \param[in] ipc_setting Intra-process communication setting for the service. */ Service( - std::shared_ptr node_handle, + std::shared_ptr node_base, std::shared_ptr service_handle, - AnyServiceCallback any_callback) + AnyServiceCallback any_callback, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) : ServiceBase(node_handle), any_callback_(any_callback), srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle()) { @@ -394,6 +440,11 @@ class Service #ifndef TRACETOOLS_DISABLED any_callback_.register_callback_for_tracing(); #endif + + // Setup intra process if requested. + if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) { + create_intra_process_service(); + } } /// Default constructor. @@ -405,11 +456,13 @@ class Service * \param[in] node_handle NodeBaseInterface pointer that is used in part of the setup. * \param[in] service_handle service handle. * \param[in] any_callback User defined callback to call when a client request is received. + * \param[in] ipc_setting Intra-process communication setting for the service. */ Service( - std::shared_ptr node_handle, + std::shared_ptr node_base, rcl_service_t * service_handle, - AnyServiceCallback any_callback) + AnyServiceCallback any_callback, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) : ServiceBase(node_handle), any_callback_(any_callback), srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle()) { @@ -431,12 +484,28 @@ class Service #ifndef TRACETOOLS_DISABLED any_callback_.register_callback_for_tracing(); #endif + // Setup intra process if requested. + if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) { + create_intra_process_service(); + } } Service() = delete; virtual ~Service() { + if (!use_intra_process_) { + return; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + // TODO(ivanpauno): should this raise an error? + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a service."); + return; + } + ipm->remove_service(intra_process_service_id_); } /// Take the next request from the service. @@ -499,6 +568,40 @@ class Service } } + void + create_intra_process_service() + { + // Check if the QoS is compatible with intra-process. + auto qos_profile = get_request_subscription_actual_qos(); + + if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { + throw std::invalid_argument( + "intraprocess communication allowed only with keep last history qos policy"); + } + if (qos_profile.depth() == 0) { + throw std::invalid_argument( + "intraprocess communication is not allowed with 0 depth qos policy"); + } + if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) { + throw std::invalid_argument( + "intraprocess communication allowed only with volatile durability"); + } + + // Create a ServiceIntraProcess which will be given to the intra-process manager. + using ServiceIntraProcessT = rclcpp::experimental::ServiceIntraProcess; + + service_intra_process_ = std::make_shared( + any_callback_, + context_, + this->get_service_name(), + qos_profile); + + using rclcpp::experimental::IntraProcessManager; + auto ipm = context_->get_sub_context(); + uint64_t intra_process_service_id = ipm->add_intra_process_service(service_intra_process_); + this->setup_intra_process(intra_process_service_id, ipm); + } + /// Configure client introspection. /** * \param[in] clock clock to use to generate introspection timestamps diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index eb1a980dd3..400941a9aa 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -150,7 +150,7 @@ class Subscription : public SubscriptionBase message_memory_strategy_(message_memory_strategy) { // Setup intra process publishing if requested. - if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { + if (rclcpp::detail::resolve_use_intra_process(options_.use_intra_process_comm, *node_base)) { using rclcpp::detail::resolve_intra_process_buffer_type; // Check if the QoS is compatible with intra-process. diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 615f3852b6..e3b6840da1 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -288,7 +288,7 @@ class SubscriptionBase : public std::enable_shared_from_this using IntraProcessManagerWeakPtr = std::weak_ptr; - /// Implemenation detail. + /// Implementation detail. RCLCPP_PUBLIC void setup_intra_process( diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index e33db71ce2..61486060ab 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -67,6 +67,26 @@ ClientBase::ClientBase( }); } +ClientBase::~ClientBase() +{ + clear_on_new_response_callback(); + // Make sure the client handle is destructed as early as possible and before the node handle + client_handle_.reset(); + + std::lock_guard lock(ipc_mutex_); + if (!use_intra_process_) { + return; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a client."); + return; + } + ipm->remove_client(intra_process_client_id_); +} + bool ClientBase::take_type_erased_response(void * response_out, rmw_request_id_t & request_header_out) { @@ -246,3 +266,34 @@ ClientBase::set_on_new_response_callback(rcl_event_callback_t callback, const vo throw_from_rcl_error(ret, "failed to set the on new response callback for client"); } } + +void +ClientBase::setup_intra_process( + uint64_t intra_process_client_id, + IntraProcessManagerWeakPtr weak_ipm) +{ + std::lock_guard lock(ipc_mutex_); + weak_ipm_ = weak_ipm; + use_intra_process_ = true; + intra_process_client_id_ = intra_process_client_id; +} + +rclcpp::Waitable::SharedPtr +ClientBase::get_intra_process_waitable() +{ + std::lock_guard lock(ipc_mutex_); + // If not using intra process, shortcut to nullptr. + if (!use_intra_process_) { + return nullptr; + } + // Get the intra process manager. + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "ClientBase::get_intra_process_waitable() called " + "after destruction of intra process manager"); + } + + // Use the id to retrieve the intra-process client from the intra-process manager. + return ipm->get_client_intra_process(intra_process_client_id_); +} diff --git a/rclcpp/src/rclcpp/client_intra_process_base.cpp b/rclcpp/src/rclcpp/client_intra_process_base.cpp new file mode 100644 index 0000000000..b445f32937 --- /dev/null +++ b/rclcpp/src/rclcpp/client_intra_process_base.cpp @@ -0,0 +1,36 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" +#include "rclcpp/experimental/client_intra_process_base.hpp" + +using rclcpp::experimental::ClientIntraProcessBase; + +void +ClientIntraProcessBase::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); +} + +const char * +ClientIntraProcessBase::get_service_name() const +{ + return service_name_.c_str(); +} + +rclcpp::QoS +ClientIntraProcessBase::get_actual_qos() const +{ + return qos_profile_; +} diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index efce4afeaf..48a8cfe8e4 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -25,12 +25,6 @@ namespace experimental static std::atomic _next_unique_id {1}; -IntraProcessManager::IntraProcessManager() -{} - -IntraProcessManager::~IntraProcessManager() -{} - uint64_t IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) { @@ -82,6 +76,73 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su return sub_id; } +uint64_t +IntraProcessManager::add_intra_process_client(ClientIntraProcessBase::SharedPtr client) +{ + std::unique_lock lock(mutex_); + + uint64_t client_id = IntraProcessManager::get_next_unique_id(); + clients_[client_id] = client; + + // adds the client to the matchable service + for (auto & pair : services_) { + auto intra_process_service = pair.second.lock(); + if (!intra_process_service) { + continue; + } + if (can_communicate(client, intra_process_service)) { + uint64_t service_id = pair.first; + clients_to_services_.emplace(client_id, service_id); + intra_process_service->add_intra_process_client(client, client_id); + break; + } + } + + return client_id; +} + +uint64_t +IntraProcessManager::add_intra_process_service(ServiceIntraProcessBase::SharedPtr service) +{ + std::unique_lock lock(mutex_); + + // First check if we have already a service registered with same service name + // Todo: Open issue about this not being enforced with normal services + auto it = services_.begin(); + + while (it != services_.end()) { + auto srv = it->second.lock(); + if (srv) { + if (srv->get_service_name() == service->get_service_name()) { + throw std::runtime_error( + "Can't have multiple services with same service name."); + } + it++; + } else { + it = services_.erase(it); + } + } + + uint64_t service_id = IntraProcessManager::get_next_unique_id(); + services_[service_id] = service; + + // adds the service to the matchable clients + for (auto & pair : clients_) { + auto client = pair.second.lock(); + if (!client) { + continue; + } + if (can_communicate(client, service)) { + uint64_t client_id = pair.first; + clients_to_services_.emplace(client_id, service_id); + + service->add_intra_process_client(client, client_id); + } + } + + return service_id; +} + void IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) { @@ -115,6 +176,33 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) pub_to_subs_.erase(intra_process_publisher_id); } +void +IntraProcessManager::remove_client(uint64_t intra_process_client_id) +{ + std::unique_lock lock(mutex_); + + clients_.erase(intra_process_client_id); + clients_to_services_.erase(intra_process_client_id); +} + +void +IntraProcessManager::remove_service(uint64_t intra_process_service_id) +{ + std::unique_lock lock(mutex_); + + services_.erase(intra_process_service_id); + + auto it = clients_to_services_.begin(); + + while (it != clients_to_services_.end()) { + if (it->second == intra_process_service_id) { + it = clients_to_services_.erase(it); + } else { + it++; + } + } +} + bool IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const { @@ -172,6 +260,58 @@ IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subsc } } +ServiceIntraProcessBase::SharedPtr +IntraProcessManager::get_service_intra_process(uint64_t intra_process_service_id) +{ + std::shared_lock lock(mutex_); + + auto service_it = services_.find(intra_process_service_id); + if (service_it == services_.end()) { + return nullptr; + } else { + auto service = service_it->second.lock(); + if (service) { + return service; + } else { + services_.erase(service_it); + return nullptr; + } + } +} + +bool +IntraProcessManager::service_is_available(uint64_t intra_process_client_id) +{ + std::shared_lock lock(mutex_); + + auto service_it = clients_to_services_.find(intra_process_client_id); + + if (service_it != clients_to_services_.end()) { + // A server matching the client has been found + return true; + } + return false; +} + +ClientIntraProcessBase::SharedPtr +IntraProcessManager::get_client_intra_process(uint64_t intra_process_client_id) +{ + std::shared_lock lock(mutex_); + + auto client_it = clients_.find(intra_process_client_id); + if (client_it == clients_.end()) { + return nullptr; + } else { + auto client = client_it->second.lock(); + if (client) { + return client; + } else { + clients_.erase(client_it); + return nullptr; + } + } +} + uint64_t IntraProcessManager::get_next_unique_id() { @@ -225,5 +365,25 @@ IntraProcessManager::can_communicate( return true; } +bool +IntraProcessManager::can_communicate( + ClientIntraProcessBase::SharedPtr client, + ServiceIntraProcessBase::SharedPtr service) const +{ + // client and service must be under the same name + if (strcmp(client->get_service_name(), service->get_service_name()) != 0) { + return false; + } + + auto check_result = rclcpp::qos_check_compatible( + client->get_actual_qos(), service->get_actual_qos()); + + if (check_result.compatibility == rclcpp::QoSCompatibility::Error) { + return false; + } + + return true; +} + } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/node_interfaces/node_services.cpp b/rclcpp/src/rclcpp/node_interfaces/node_services.cpp index e9c4a5266e..f3912c2cda 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_services.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_services.cpp @@ -41,6 +41,12 @@ NodeServices::add_service( group->add_service(service_base_ptr); + auto service_intra_process_waitable = service_base_ptr->get_intra_process_waitable(); + if (nullptr != service_intra_process_waitable) { + // Add to the callback group to be notified about intra-process msgs. + group->add_waitable(service_intra_process_waitable); + } + // Notify the executor that a new service was created using the parent Node. try { node_base_->trigger_notify_guard_condition(); @@ -67,6 +73,12 @@ NodeServices::add_client( group->add_client(client_base_ptr); + auto client_intra_process_waitable = client_base_ptr->get_intra_process_waitable(); + if (nullptr != client_intra_process_waitable) { + // Add to the callback group to be notified about intra-process msgs. + group->add_waitable(client_intra_process_waitable); + } + // Notify the executor that a new client was created using the parent Node. try { node_base_->trigger_notify_guard_condition(); diff --git a/rclcpp/src/rclcpp/service.cpp b/rclcpp/src/rclcpp/service.cpp index 9c246e4b6b..c97b2927a6 100644 --- a/rclcpp/src/rclcpp/service.cpp +++ b/rclcpp/src/rclcpp/service.cpp @@ -21,6 +21,7 @@ #include #include "rclcpp/any_service_callback.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/qos.hpp" #include "rmw/error_handling.h" @@ -28,11 +29,30 @@ using rclcpp::ServiceBase; -ServiceBase::ServiceBase(std::shared_ptr node_handle) -: node_handle_(node_handle), - node_logger_(rclcpp::get_node_logger(node_handle_.get())) +ServiceBase::ServiceBase( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base) +: node_handle_(node_base->get_shared_rcl_node_handle()), + context_(node_base->get_context()), + node_logger_(rclcpp::get_node_logger(node_base->get_shared_rcl_node_handle().get())) {} +ServiceBase::~ServiceBase() +{ + clear_on_new_request_callback(); + + std::lock_guard lock(ipc_mutex_); + if (!use_intra_process_) { + return; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a service."); + return; + } + ipm->remove_service(intra_process_service_id_); +} bool ServiceBase::take_type_erased_request(void * request_out, rmw_request_id_t & request_id_out) @@ -123,6 +143,37 @@ ServiceBase::get_request_subscription_actual_qos() const return request_subscription_qos; } +void +ServiceBase::setup_intra_process( + uint64_t intra_process_service_id, + IntraProcessManagerWeakPtr weak_ipm) +{ + std::lock_guard lock(ipc_mutex_); + intra_process_service_id_ = intra_process_service_id; + weak_ipm_ = weak_ipm; + use_intra_process_ = true; +} + +rclcpp::Waitable::SharedPtr +ServiceBase::get_intra_process_waitable() +{ + std::lock_guard lock(ipc_mutex_); + // If not using intra process, shortcut to nullptr. + if (!use_intra_process_) { + return nullptr; + } + // Get the intra process manager. + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "ServiceBase::get_intra_process_waitable() called " + "after destruction of intra process manager"); + } + + // Use the id to retrieve the intra-process service from the intra-process manager. + return ipm->get_service_intra_process(intra_process_service_id_); +} + void ServiceBase::set_on_new_request_callback(rcl_event_callback_t callback, const void * user_data) { diff --git a/rclcpp/src/rclcpp/service_intra_process_base.cpp b/rclcpp/src/rclcpp/service_intra_process_base.cpp new file mode 100644 index 0000000000..2ccb38779c --- /dev/null +++ b/rclcpp/src/rclcpp/service_intra_process_base.cpp @@ -0,0 +1,45 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" +#include "rclcpp/experimental/service_intra_process_base.hpp" + +using rclcpp::experimental::ServiceIntraProcessBase; + +void +ServiceIntraProcessBase::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); +} + +const char * +ServiceIntraProcessBase::get_service_name() const +{ + return service_name_.c_str(); +} + +rclcpp::QoS +ServiceIntraProcessBase::get_actual_qos() const +{ + return qos_profile_; +} + +void +ServiceIntraProcessBase::add_intra_process_client( + rclcpp::experimental::ClientIntraProcessBase::SharedPtr client, + uint64_t client_id) +{ + std::unique_lock lock(reentrant_mutex_); + clients_[client_id] = client; +} diff --git a/rclcpp/test/rclcpp/node_interfaces/test_node_services.cpp b/rclcpp/test/rclcpp/node_interfaces/test_node_services.cpp index ea55a1aac2..9a6353e7d6 100644 --- a/rclcpp/test/rclcpp/node_interfaces/test_node_services.cpp +++ b/rclcpp/test/rclcpp/node_interfaces/test_node_services.cpp @@ -30,7 +30,7 @@ class TestService : public rclcpp::ServiceBase { public: explicit TestService(rclcpp::Node * node) - : rclcpp::ServiceBase(node->get_node_base_interface()->get_shared_rcl_node_handle()) {} + : rclcpp::ServiceBase(node->get_node_base_interface()) {} std::shared_ptr create_request() override {return nullptr;} std::shared_ptr create_request_header() override {return nullptr;} @@ -41,7 +41,8 @@ class TestClient : public rclcpp::ClientBase { public: explicit TestClient(rclcpp::Node * node) - : rclcpp::ClientBase(node->get_node_base_interface().get(), node->get_node_graph_interface()) {} + : rclcpp::ClientBase(node->get_node_base_interface().get(), node->get_node_graph_interface()) + {} std::shared_ptr create_response() override {return nullptr;} std::shared_ptr create_request_header() override {return nullptr;} diff --git a/rclcpp/test/rclcpp/test_client.cpp b/rclcpp/test/rclcpp/test_client.cpp index 9070e1caa9..b959c0ea41 100644 --- a/rclcpp/test/rclcpp/test_client.cpp +++ b/rclcpp/test/rclcpp/test_client.cpp @@ -133,7 +133,8 @@ TEST_F(TestClient, construction_with_free_function) { node->get_node_services_interface(), "service", rmw_qos_profile_services_default, - nullptr); + nullptr, + rclcpp::IntraProcessSetting::Disable); } { ASSERT_THROW( @@ -144,7 +145,8 @@ TEST_F(TestClient, construction_with_free_function) { node->get_node_services_interface(), "invalid_?service", rmw_qos_profile_services_default, - nullptr); + nullptr, + rclcpp::IntraProcessSetting::Disable); }, rclcpp::exceptions::InvalidServiceNameError); } { diff --git a/rclcpp/test/rclcpp/test_externally_defined_services.cpp b/rclcpp/test/rclcpp/test_externally_defined_services.cpp index 563f7f6ec3..3938c847c7 100644 --- a/rclcpp/test/rclcpp/test_externally_defined_services.cpp +++ b/rclcpp/test/rclcpp/test_externally_defined_services.cpp @@ -66,7 +66,7 @@ TEST_F(TestExternallyDefinedServices, extern_defined_uninitialized) { // expect fail try { rclcpp::Service( - node_handle->get_node_base_interface()->get_shared_rcl_node_handle(), + node_handle->get_node_base_interface(), &service_handle, cb); } catch (const std::runtime_error &) { SUCCEED(); @@ -97,7 +97,7 @@ TEST_F(TestExternallyDefinedServices, extern_defined_initialized) { try { rclcpp::Service( - node_handle->get_node_base_interface()->get_shared_rcl_node_handle(), + node_handle->get_node_base_interface(), &service_handle, cb); } catch (const std::runtime_error &) { FAIL(); @@ -137,7 +137,7 @@ TEST_F(TestExternallyDefinedServices, extern_defined_destructor) { { // Call constructor rclcpp::Service srv_cpp( - node_handle->get_node_base_interface()->get_shared_rcl_node_handle(), + node_handle->get_node_base_interface(), &service_handle, cb); // Call destructor } diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 45d916b004..70718bb313 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -15,9 +15,12 @@ #include #include +#include #include #include +#include #include +#include // NOLINT #include #define RCLCPP_BUILDING_LIBRARY 1 @@ -304,6 +307,98 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< } }; +class ClientIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ClientIntraProcessBase) + + const char * + get_service_name() const + { + return nullptr; + } + + QoS get_actual_qos() const + { + QoS qos(0); + return qos; + } +}; + +template +class ClientIntraProcess : public ClientIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ClientIntraProcess) +}; + +class ServiceIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBase) + + void + add_intra_process_client( + rclcpp::experimental::mock::ClientIntraProcessBase::SharedPtr client, + uint64_t client_id) + { + (void)client; + (void)client_id; + } + + const char * + get_service_name() const + { + return nullptr; + } + + QoS get_actual_qos() const + { + QoS qos(0); + return qos; + } +}; + +template +class ServiceIntraProcess : public ServiceIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcess) + + using SharedRequest = typename ServiceT::Request::SharedPtr; + using SharedResponse = typename ServiceT::Response::SharedPtr; + + using Promise = std::promise; + using PromiseWithRequest = std::promise>; + + using SharedFuture = std::shared_future; + using SharedFutureWithRequest = std::shared_future>; + + using CallbackType = std::function; + using CallbackWithRequestType = std::function; + + using CallbackTypeValueVariant = std::tuple; + using CallbackWithRequestTypeValueVariant = std::tuple< + CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>; + + using CallbackInfoVariant = std::variant< + std::promise, + CallbackTypeValueVariant, + CallbackWithRequestTypeValueVariant>; + + using RequestCallbackPair = std::pair; + using ClientIDtoRequest = std::pair; + + void + store_intra_process_request( + uint64_t intra_process_client_id, + RequestCallbackPair request) + { + (void)intra_process_client_id; + (void)request; + } +}; + } // namespace mock } // namespace experimental } // namespace rclcpp @@ -314,6 +409,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_HPP_ +#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_HPP_ +#define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ // Force ipm to use our mock publisher class. #define Publisher mock::Publisher #define PublisherBase mock::PublisherBase @@ -321,12 +420,20 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< #define SubscriptionIntraProcessBase mock::SubscriptionIntraProcessBase #define SubscriptionIntraProcessBuffer mock::SubscriptionIntraProcessBuffer #define SubscriptionIntraProcess mock::SubscriptionIntraProcess +#define ServiceIntraProcessBase mock::ServiceIntraProcessBase +#define ServiceIntraProcess mock::ServiceIntraProcess +#define ClientIntraProcessBase mock::ClientIntraProcessBase +#define ClientIntraProcess mock::ClientIntraProcess #include "../src/rclcpp/intra_process_manager.cpp" // NOLINT #undef Publisher #undef PublisherBase #undef IntraProcessBuffer #undef SubscriptionIntraProcessBase #undef SubscriptionIntraProcess +#undef ServiceIntraProcessBase +#undef ServiceIntraProcess +#undef ClientIntraProcessBase +#undef ClientIntraProcess using ::testing::_; using ::testing::UnorderedElementsAre; diff --git a/rclcpp/test/rclcpp/test_service.cpp b/rclcpp/test/rclcpp/test_service.cpp index a3c361cfde..b9ab010e0f 100644 --- a/rclcpp/test/rclcpp/test_service.cpp +++ b/rclcpp/test/rclcpp/test_service.cpp @@ -19,6 +19,7 @@ #include #include "rclcpp/exceptions.hpp" +#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/rclcpp.hpp" #include "../mocking_utils/patch.hpp" @@ -165,18 +166,28 @@ TEST_F(TestService, basic_public_getters) { rcl_service_options_t service_options = rcl_service_get_default_options(); const rosidl_service_type_support_t * ts = rosidl_typesupport_cpp::get_service_type_support_handle(); + auto node_base_interface = node_handle_int->get_node_base_interface(); rcl_ret_t ret = rcl_service_init( &service_handle, - node_handle_int->get_node_base_interface()->get_rcl_node_handle(), + node_base_interface->get_rcl_node_handle(), ts, "base_node_service", &service_options); if (ret != RCL_RET_OK) { FAIL(); return; } rclcpp::AnyServiceCallback cb; + + + rclcpp::IntraProcessSetting ipc_setting; + if (node_base_interface->get_use_intra_process_default()) { + ipc_setting = rclcpp::IntraProcessSetting::Enable; + } else { + ipc_setting = rclcpp::IntraProcessSetting::Disable; + } + const rclcpp::Service base( - node_handle_int->get_node_base_interface()->get_shared_rcl_node_handle(), - &service_handle, cb); + node_handle_int->get_node_base_interface(), + &service_handle, cb, ipc_setting); // Use get_service_handle specific to const service std::shared_ptr const_service_handle = base.get_service_handle(); EXPECT_NE(nullptr, const_service_handle); diff --git a/rclcpp_lifecycle/src/lifecycle_node_interface_impl.cpp b/rclcpp_lifecycle/src/lifecycle_node_interface_impl.cpp index 5c5f7797e1..6145bf6805 100644 --- a/rclcpp_lifecycle/src/lifecycle_node_interface_impl.cpp +++ b/rclcpp_lifecycle/src/lifecycle_node_interface_impl.cpp @@ -81,6 +81,13 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf state_machine_options.enable_com_interface = enable_communication_interface; state_machine_options.allocator = node_options->allocator; + rclcpp::IntraProcessSetting ipc_setting; + if (node_base_interface_->get_use_intra_process_default()) { + ipc_setting = rclcpp::IntraProcessSetting::Enable; + } else { + ipc_setting = rclcpp::IntraProcessSetting::Disable; + } + // The call to initialize the state machine takes // currently five different typesupports for all publishers/services // created within the RCL_LIFECYCLE structure. @@ -116,9 +123,11 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf any_cb.set(std::move(cb)); srv_change_state_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_change_state, - any_cb); + any_cb, + ipc_setting); + node_services_interface_->add_service( std::dynamic_pointer_cast(srv_change_state_), nullptr); @@ -132,9 +141,11 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf any_cb.set(std::move(cb)); srv_get_state_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_get_state, - any_cb); + any_cb, + ipc_setting); + node_services_interface_->add_service( std::dynamic_pointer_cast(srv_get_state_), nullptr); @@ -148,9 +159,11 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf any_cb.set(std::move(cb)); srv_get_available_states_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_get_available_states, - any_cb); + any_cb, + ipc_setting); + node_services_interface_->add_service( std::dynamic_pointer_cast(srv_get_available_states_), nullptr); @@ -165,9 +178,11 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf srv_get_available_transitions_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_get_available_transitions, - any_cb); + any_cb, + ipc_setting); + node_services_interface_->add_service( std::dynamic_pointer_cast(srv_get_available_transitions_), nullptr); @@ -182,9 +197,11 @@ LifecycleNode::LifecycleNodeInterfaceImpl::init(bool enable_communication_interf srv_get_transition_graph_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_get_transition_graph, - any_cb); + any_cb, + ipc_setting); + node_services_interface_->add_service( std::dynamic_pointer_cast(srv_get_transition_graph_), nullptr); diff --git a/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp b/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp index d09f44831c..bda352d3c2 100644 --- a/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp +++ b/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp @@ -30,6 +30,7 @@ #include "rcl_lifecycle/rcl_lifecycle.h" +#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/node_interfaces/node_services_interface.hpp"